forked from ChimeraCoder/anaconda
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtwitter.go
302 lines (262 loc) · 9.11 KB
/
twitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
//Package anaconda provides structs and functions for accessing version 1.1
//of the Twitter API.
//
//Successful API queries return native Go structs that can be used immediately,
//with no need for type assertions.
//
//Authentication
//
//If you already have the access token (and secret) for your user (Twitter provides this for your own account on the developer portal), creating the client is simple:
//
// anaconda.SetConsumerKey("your-consumer-key")
// anaconda.SetConsumerSecret("your-consumer-secret")
// api := anaconda.NewTwitterApi("your-access-token", "your-access-token-secret")
//
//
//Queries
//
//Executing queries on an authenticated TwitterApi struct is simple.
//
// searchResult, _ := api.GetSearch("golang", nil)
// for _ , tweet := range searchResult.Statuses {
// fmt.Print(tweet.Text)
// }
//
//Certain endpoints allow separate optional parameter; if desired, these can be passed as the final parameter.
//
// v := url.Values{}
// v.Set("count", "30")
// result, err := api.GetSearch("golang", v)
//
//
//Endpoints
//
//Anaconda implements most of the endpoints defined in the Twitter API documentation: https://dev.twitter.com/docs/api/1.1.
//For clarity, in most cases, the function name is simply the name of the HTTP method and the endpoint (e.g., the endpoint `GET /friendships/incoming` is provided by the function `GetFriendshipsIncoming`).
//
//In a few cases, a shortened form has been chosen to make life easier (for example, retweeting is simply the function `Retweet`)
//
//More detailed information about the behavior of each particular endpoint can be found at the official Twitter API documentation.
package anaconda
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"
"strings"
"github.com/ChimeraCoder/tokenbucket"
"github.com/garyburd/go-oauth/oauth"
)
const (
_GET = iota
_POST = iota
BaseUrlV1 = "https://api.twitter.com/1"
BaseUrl = "https://api.twitter.com/1.1"
UploadBaseUrl = "https://upload.twitter.com/1.1"
)
var oauthClient = oauth.Client{
TemporaryCredentialRequestURI: "https://api.twitter.com/oauth/request_token",
ResourceOwnerAuthorizationURI: "https://api.twitter.com/oauth/authenticate",
TokenRequestURI: "https://api.twitter.com/oauth/access_token",
}
type TwitterApi struct {
Credentials *oauth.Credentials
queryQueue chan query
bucket *tokenbucket.Bucket
returnRateLimitError bool
HttpClient *http.Client
// Currently used only for the streaming API
// and for checking rate-limiting headers
// Default logger is silent
Log Logger
// used for testing
// defaults to BaseUrl
baseUrl string
}
type query struct {
url string
form url.Values
data interface{}
method int
response_ch chan response
}
type response struct {
data interface{}
err error
}
const DEFAULT_DELAY = 0 * time.Second
const DEFAULT_CAPACITY = 5
//NewTwitterApi takes an user-specific access token and secret and returns a TwitterApi struct for that user.
//The TwitterApi struct can be used for accessing any of the endpoints available.
func NewTwitterApi(access_token string, access_token_secret string) *TwitterApi {
//TODO figure out how much to buffer this channel
//A non-buffered channel will cause blocking when multiple queries are made at the same time
queue := make(chan query)
c := &TwitterApi{
Credentials: &oauth.Credentials{
Token: access_token,
Secret: access_token_secret,
},
queryQueue: queue,
bucket: nil,
returnRateLimitError: false,
HttpClient: http.DefaultClient,
Log: silentLogger{},
baseUrl: BaseUrl,
}
go c.throttledQuery()
return c
}
//SetConsumerKey will set the application-specific consumer_key used in the initial OAuth process
//This key is listed on https://dev.twitter.com/apps/YOUR_APP_ID/show
func SetConsumerKey(consumer_key string) {
oauthClient.Credentials.Token = consumer_key
}
//SetConsumerSecret will set the application-specific secret used in the initial OAuth process
//This secret is listed on https://dev.twitter.com/apps/YOUR_APP_ID/show
func SetConsumerSecret(consumer_secret string) {
oauthClient.Credentials.Secret = consumer_secret
}
// ReturnRateLimitError specifies behavior when the Twitter API returns a rate-limit error.
// If set to true, the query will fail and return the error instead of automatically queuing and
// retrying the query when the rate limit expires
func (c *TwitterApi) ReturnRateLimitError(b bool) {
c.returnRateLimitError = b
}
// Enable query throttling using the tokenbucket algorithm
func (c *TwitterApi) EnableThrottling(rate time.Duration, bufferSize int64) {
c.bucket = tokenbucket.NewBucket(rate, bufferSize)
}
// Disable query throttling
func (c *TwitterApi) DisableThrottling() {
c.bucket = nil
}
// SetDelay will set the delay between throttled queries
// To turn of throttling, set it to 0 seconds
func (c *TwitterApi) SetDelay(t time.Duration) {
c.bucket.SetRate(t)
}
func (c *TwitterApi) GetDelay() time.Duration {
return c.bucket.GetRate()
}
// SetBaseUrl is experimental and may be removed in future releases.
func (c *TwitterApi) SetBaseUrl(baseUrl string) {
c.baseUrl = baseUrl
}
//AuthorizationURL generates the authorization URL for the first part of the OAuth handshake.
//Redirect the user to this URL.
//This assumes that the consumer key has already been set (using SetConsumerKey).
func AuthorizationURL(callback string) (string, *oauth.Credentials, error) {
tempCred, err := oauthClient.RequestTemporaryCredentials(http.DefaultClient, callback, nil)
if err != nil {
return "", nil, err
}
return oauthClient.AuthorizationURL(tempCred, nil), tempCred, nil
}
func GetCredentials(tempCred *oauth.Credentials, verifier string) (*oauth.Credentials, url.Values, error) {
return oauthClient.RequestToken(http.DefaultClient, tempCred, verifier)
}
func cleanValues(v url.Values) url.Values {
if v == nil {
return url.Values{}
}
return v
}
// apiGet issues a GET request to the Twitter API and decodes the response JSON to data.
func (c TwitterApi) apiGet(urlStr string, form url.Values, data interface{}) error {
resp, err := oauthClient.Get(c.HttpClient, c.Credentials, urlStr, form)
if err != nil {
return err
}
defer resp.Body.Close()
return decodeResponse(resp, data)
}
// apiPost issues a POST request to the Twitter API and decodes the response JSON to data.
func (c TwitterApi) apiPost(urlStr string, form url.Values, data interface{}) error {
resp, err := oauthClient.Post(c.HttpClient, c.Credentials, urlStr, form)
if err != nil {
return err
}
defer resp.Body.Close()
return decodeResponse(resp, data)
}
// decodeResponse decodes the JSON response from the Twitter API.
func decodeResponse(resp *http.Response, data interface{}) error {
// according to dev.twitter.com, chunked upload append returns HTTP 2XX
// so we need a special case when decoding the response
if strings.HasSuffix(resp.Request.URL.String(), "upload.json") {
if resp.StatusCode == 204 {
// empty response, don't decode
return nil
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return newApiError(resp)
}
} else if resp.StatusCode != 200 {
return newApiError(resp)
}
return json.NewDecoder(resp.Body).Decode(data)
}
func NewApiError(resp *http.Response) *ApiError {
body, _ := ioutil.ReadAll(resp.Body)
return &ApiError{
StatusCode: resp.StatusCode,
Header: resp.Header,
Body: string(body),
URL: resp.Request.URL,
}
}
//query executes a query to the specified url, sending the values specified by form, and decodes the response JSON to data
//method can be either _GET or _POST
func (c TwitterApi) execQuery(urlStr string, form url.Values, data interface{}, method int) error {
switch method {
case _GET:
return c.apiGet(urlStr, form, data)
case _POST:
return c.apiPost(urlStr, form, data)
default:
return fmt.Errorf("HTTP method not yet supported")
}
}
// throttledQuery executes queries and automatically throttles them according to SECONDS_PER_QUERY
// It is the only function that reads from the queryQueue for a particular *TwitterApi struct
func (c *TwitterApi) throttledQuery() {
for q := range c.queryQueue {
url := q.url
form := q.form
data := q.data //This is where the actual response will be written
method := q.method
response_ch := q.response_ch
if c.bucket != nil {
<-c.bucket.SpendToken(1)
}
err := c.execQuery(url, form, data, method)
// Check if Twitter returned a rate-limiting error
if err != nil {
if apiErr, ok := err.(*ApiError); ok {
if isRateLimitError, nextWindow := apiErr.RateLimitCheck(); isRateLimitError && !c.returnRateLimitError {
c.Log.Info(apiErr.Error())
// If this is a rate-limiting error, re-add the job to the queue
// TODO it really should preserve order
go func() {
c.queryQueue <- q
}()
delay := nextWindow.Sub(time.Now())
<-time.After(delay)
// Drain the bucket (start over fresh)
if c.bucket != nil {
c.bucket.Drain()
}
continue
}
}
}
response_ch <- response{data, err}
}
}
// Close query queue
func (c *TwitterApi) Close() {
close(c.queryQueue)
}